package com.atguigu.flink.sql.window;

import com.atguigu.flink.function.WaterSensorMapFunction;
import com.atguigu.flink.pojo.WaterSensor;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * Created by Smexy on 2023/9/16

    时间窗口，既支持TableAPI，也支持SQL(讲解)。
    SQL中有两种语法:
        GroupWindow(老)
        TVFWindow(新)

 窗口运算: https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/sql/queries/window-agg/#group-window-aggregation
               之前flink使用 ，group-window聚合现在已经过时，强烈推荐使用 TVF Window

               TVFWindow更强大:
                   可以优化
                   进行TopN
                   支持grouping sets
    -------------------------------------------
    会话窗口：  group-window聚合，不支持TVF窗口
    滚动，滑动窗口：  支持TVF窗口
    累积窗口：  支持TVF窗口
 *
 *
 */


public class Demo2_TimeWindow
{
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
                env.setParallelism(1);

                SingleOutputStreamOperator<WaterSensor> ds = env
                    .socketTextStream("hadoop102", 8888)
                    .map(new WaterSensorMapFunction())
                    ;

                Schema schema = Schema.newBuilder()
                                      .column("id", "STRING")
                                      .column("ts", "BIGINT")
                                      .column("vc", "INT")
                                      .columnByExpression("pt", "PROCTIME()")
                                      .columnByExpression("et", "TO_TIMESTAMP_LTZ(ts,3)")
                                      .watermark("et", "et - INTERVAL '0.001' SECOND")
                                      .build();


                tableEnv.createTemporaryView("t1",ds,schema);

                /*
                    5s滚动的处理时间窗口
                        全局或keyed也是在group by分组中提现。
                        window_start: 时间窗口的起始时间
                        window_end: 时间窗口的结束时间
                 */
               String tumbleSql = " select  window_start, window_end , sum(vc) sumVc" +
                                    " from TABLE( " +
                                    " TUMBLE(TABLE t1, DESCRIPTOR(pt), INTERVAL '5' SECOND ) )  " +
                                    " GROUP BY window_start, window_end  " ; //全局窗口
                                   // " GROUP BY window_start, window_end , id " ; //keyed窗口



        /*
            size = 6s,slide = 3s 的全局事件时间滑动窗口
            TVF中必须要求slize是slide的整倍数。

            HOP(TABLE data, DESCRIPTOR(timecol), slide, size [, offset ])

            滑动窗口的第一次slide:  (0 - slide) + size  = 3s
                第一个窗口:[ -3000,2999]
                第二个窗口:[ 0,5999]
         */
        String hopSql = " select  window_start, window_end , sum(vc) sumVc" +
            " from TABLE( " +
            " HOP(TABLE t1, DESCRIPTOR(et), INTERVAL '3' SECOND , INTERVAL '6' SECOND) )  " +
            " GROUP BY window_start, window_end  " ;


        /*
            session需要使用groupwindow的语法。TVF暂不支持。
            https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/sql/queries/window-agg/#group-window-aggregation

         */

        String sessionSql = " select SESSION_START(et, INTERVAL '3' SECOND) , " +
                         "            SESSION_end(et, INTERVAL '3' SECOND) , " +
                             " sum(vc) sumVc " +
                             " from t1 " +
                             " GROUP BY  SESSION( et, INTERVAL '3' SECOND)" ;


        /*
            每2s算一次，最多算6s
            第一个窗口: [0,2s)
            第二个窗口: [0,4s)
            第三个窗口: [0,6s)
            第四个窗口: [6,8s)
            第五个窗口: [6,10s)
            第六个窗口: [6,12s)
         */
        String cumulateSql = " select  window_start, window_end , sum(vc) sumVc" +
            " from TABLE( " +
            " CUMULATE(TABLE t1, DESCRIPTOR(et), INTERVAL '2' SECOND , INTERVAL '6' SECOND) )  " +
            " GROUP BY window_start, window_end  " ;

                tableEnv.sqlQuery(cumulateSql).execute().print();

               env.execute();

    }
}
